理解 Reactor 模式的 3 个重要概念:Publisher、Subscriber和Subscription
在Reactor中,有三个重要的概念:Publisher、Subscriber和Subscription。
-
Publisher:Publisher是一个接口,在Reactor中用于表示数据源。它可以产生数据流,将数据推送给订阅者。Publisher接口定义了subscribe()方法,用于订阅并连接Subscriber和数据源。
-
Subscriber:Subscriber是一个接口,用于接收数据流并处理数据。Subscriber通过实现onSubscribe()、onNext()、onError()和onComplete()等方法来定义对数据流的处理逻辑。当Subscriber通过subscribe()方法订阅Publisher时,它将与Publisher建立连接,并开始接收数据流。
-
Subscription:Subscription是一个接口,用于表示订阅关系。当Subscriber订阅Publisher时,Publisher将创建一个Subscription实例。Subscription接口定义了request()和cancel()方法,用于请求更多的数据或取消订阅。
这三个概念共同构成了Reactor中的反应式流处理模型。Publisher负责产生数据流,Subscriber负责接收和处理数据流,而Subscription则管理Subscriber和Publisher之间的订阅关系。通过这种模型,Reactor提供了一种非阻塞、异步的编程模式,使得处理数据流变得简单、灵活和高效。
下面是一个简单的示例代码,演示了如何实现Publisher、Subscriber和Subscription接口:
1 | import org.reactivestreams.Publisher; |
在这个示例中,我们创建了一个简单的Publisher实现,它通过计数器模拟产生数据流,并将数据推送给Subscriber。Subscriber实现中,我们在onSubscribe()方法中请求一个数据,并在onNext()方法中处理接收到的数据,然后再请求下一个数据,直到数据流结束。当数据流完成后,我们在onComplete()方法中处理完成信号。
通过实现这三个接口,我们可以方便地创建和控制反应式数据流,实现自定义的数据处理逻辑。在实际应用中,我们可以使用Reactor提供的各种操作符和工具类来构建更复杂的流处理逻辑。
===============================
个人理解:
Publisher 只定义了 subscribe() 方法,他是用来为 Subscriber 建立订阅会话用的(Subscription 就是会话),就类似 kafka 的消费组的概念,也可以比喻成我们跑去消息中心建了一条光缆。
从这个定义我们可以推理出:
- Publisher 怎么生产数据是不受约束的。
- Subscription 保存在哪里没约束。
- Subscriber 保存在哪也没有约束。
- Publisher 怎么把生产消息推送给 Subscription 也没约束。
- Publisher 消息的类型也没约束。
- Publisher 是单例还是多例,也没规定。
- Publisher 没有约定已发送的消息就必须删除。
Subscriber 负责的核心方法是 onNext(), 这个方法可以看成是 消息消费者的一个接收消息的 API。
从这个定义我们可以推理出:
- 给对象最核心的方法是 onNext()。( 其他 onSubscribe onError onComplete 均可有可无)
- onNext() 没有定义重试和事务等机制, 因为作者假定了会话的处理是单调稳定的,又或者说作者认为事务由 Subscriber 去保障(Publisher 只把话说一次,能悟不能悟就看 Subscriber 造化 )。
- 他的 onSubscribe onError onComplete 看上去是围绕 Subscriber 的生命周期设计的,其实不是,他围绕的是 Subscription 的生命周期。
- Subscriber 没有约束 onNext() 被调用的时机(由谁触发)。
- Subscriber 没有约束 onNext() 的调用方式(在哪里调用)。
- Subscriber 没有约束 onNext() 处理什么业务,以及 怎么处理。
- onError 也没有约束 在哪里调用,所以几乎你能想到的地方都可以调用, 因为 Subscription 覆盖了所有上下文,他包括了 Publisher 与 Subscriber。
Subscription 只定义了 request() 方法,用来通知 Publisher 发消息给 Subscriber,以及定义了 cancel() 用来通知 Publisher 别发消息给 Subscriber。
从这个定义我们可以推理出:
- request() 没有约束什么时候调用、也没有约束由谁调用。
- request() 没有约束如何通知 Publisher。
- request() 没有约束 Publisher 如何发送消息,仅仅是一种建议。
- request() 没有约束 Publisher 从哪里开始发消息给 Subscriber。
- cancel() 也是一样,非常微不足道,以至于对 Publisher 的约束度很松散。
因此,Publisher 和 Subscription、Subscriber 约束如此的少,所以,他的实现类千奇百怪,学习起来很晦涩,但我们只要记住他是一种生产者消费者模式即可,只是多了一个 Subscription.request() 实现对消费者负载保护罢了(就好像 TCP 拥堵控制算法对网络的保护)。
常规实现:
1.利用装饰者模式嵌套调用。
2.subscribe() 中的 onSubscribe() 执行首次 request()
3.在 request() 中执行消息发送给 Subscriber 的动作,onNext()
4.在 onNext() 中执行 hook 操作。
5.如果由于是装饰者模式,所以在执行完当前的 onNext() 后,还需要执行hook链上其他 Subscriber 的onNext() 消息。
如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !